/**
 * 
 */
package com.fnic.pearl.scheduler.mod;

import java.util.Iterator;
import java.util.Map;

import org.apache.log4j.Logger;

import com.fnic.pearl.scheduler.constant.TestTaskStatus;
import com.fnic.pearl.scheduler.dao.SchedulerStore;
import com.fnic.pearl.scheduler.model.ProbeTaskStatus;
import com.fnic.pearl.scheduler.model.TestTask;

/**
 * 测量任务调度线程
 * 主要负责判断 init 状态的测量任务是否可被调度
 * 
 * 从 1.2 版本以后采用 线程或线程池 只针对任务，而不和探针ID关联
 * 从而避免一个探针一个线程方式 导致的 探针的增多会产生过多的线程的问题
 * 在 scheduler 启动时加载
 * 
 * @author HuHaiyang
 * @date 2013年8月15日
 */
public class TestTaskSchedulerHandler implements Runnable
{
    private static Logger LOG = Logger.getLogger(TestTaskSchedulerHandler.class);
    private volatile boolean isStopped = false;
    
    // 当前调度
    public static final int RESULT_CURR_SCHEDULER = 0;
    
    // 下次调度
    public static final int RESULT_NEXT_SCHEDULER = 1;
    
    // 不能再被调度
    public static final int RESULT_CANOT_SCHEDULER = 2;
        
    /**
     * @see java.lang.Runnable#run()
     */
    public void run()
    {
        LOG.info("task scheduler start");
        while (!isStopped)
        {
            TestTask tt = null;
            try
            {
                tt = SchedulerStore.getInstance().popInitTestTask();
            }
            catch (InterruptedException e1)
            {
                // TODO 将正在执行的任务移除
                // 同时生成 失败 任务状态记录同步至 管理节点
                // 并且需要移至 init 队列
                // 因为对于需要执行多次的 任务 或 未到达结束时间
                // 应该能够等待 probe 重启后继续调度和下发任务
                Map<Long, TestTask> tasks = SchedulerStore.getInstance().popRunningTestTask(tt.getProbe());
                if (tasks != null && tasks.size() > 0)
                {
                    Iterator<TestTask> iter = tasks.values().iterator();
                    while (iter.hasNext())
                    {
                        TestTask t = iter.next();
                        ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), 
                                t.getTaskId(), TestTaskStatus.S_FAILED_TASK_HANGUP);
                        pts.setExecNum(t.getExecNum());
                        SchedulerStore.getInstance().putTestTaskStatus(pts);
                        SchedulerStore.getInstance().updateTestTaskStatus(t, TestTaskStatus.S_NEW_TASK);
                        SchedulerStore.getInstance().putInitTestTask(t);
                    }
                }
                break;
            }
            
            if (tt == null)
            {
                continue;
            }
            
            switch (TestTaskSchedulerRuler.canExecute(tt))
            {
            case RESULT_CURR_SCHEDULER:
                LOG.info("current - " + tt);
                SchedulerStore.getInstance().updateTestTaskStatus(tt, TestTaskStatus.S_WISSUE);
                SchedulerStore.getInstance().putWissueTestTask(tt);
                SchedulerStore.getInstance().updateTestTaskExecNum(tt);
                
                ProbeTaskStatus pts = new ProbeTaskStatus(tt.getProbe(), 
                        tt.getTaskId(), TestTaskStatus.S_WISSUE);
                pts.setExecNum(tt.getExecNum());
                SchedulerStore.getInstance().putTestTaskStatus(pts);
                break;
                
            case RESULT_NEXT_SCHEDULER:
                SchedulerStore.getInstance().updateTestTaskStatus(tt, TestTaskStatus.S_NEW_TASK);
                SchedulerStore.getInstance().putInitTestTask(tt);
                break;
                
            case RESULT_CANOT_SCHEDULER:
                LOG.info("task over - " + tt);
                SchedulerStore.getInstance().removeTestTask(tt);
                break;
            }
        }
        LOG.info("task scheduler stop!");
    }
    
    public void setStopped(boolean stopped)
    {
        this.isStopped = stopped;
    }
}
